package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.atguigu.bean.TrafficPageViewBean;
import com.atguigu.common.Constant;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.DorisUtil;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

//数据流:web/app -> Nginx -> 日志服务器(file) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Doris
//程  序:Mock -> Nginx(日志服务器、文件) -> f1.sh -> Kafka(ZK) -> DwdTrafficBaseLogSplit -> Kafka(ZK) -> Dws02_TrafficVcChArIsNewPageViewWindow -> Doris
public class Dws02_TrafficVcChArIsNewPageViewWindow {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
        env.enableCheckpointing(10000L);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointTimeout(20000L);
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //checkpointConfig.setCheckpointInterval(10000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        //默认是int类型的最大值
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        env.setStateBackend(new HashMapStateBackend());

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka DWD层页面日志主题数据创建流  提取事件时间
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_DWD_TRAFFIC_PAGE, "vc_ch_ar_isnew_page_view_230315"),
                WatermarkStrategy.noWatermarks(),
                "kafka-source");

        //3.过滤并转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        System.out.println("脏数据：" + value);
                    }
                }
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                return element.getLong("ts");
            }
        }));

        //4.按照Mid进行分组
        KeyedStream<JSONObject, String> keyedByMidStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //5.去重  mid  sid 并将数据转换为JavaBean对象
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewDS = keyedByMidStream.map(new RichMapFunction<JSONObject, TrafficPageViewBean>() {

            private ValueState<String> lastVisitDtState;
            private MapState<String, String> sidState;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig valueTtlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                        .build();
                ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("last-visit-dt", String.class);
                valueStateDescriptor.enableTimeToLive(valueTtlConfig);
                lastVisitDtState = getRuntimeContext().getState(valueStateDescriptor);

                StateTtlConfig mapTtlConfig = new StateTtlConfig.Builder(Time.minutes(2))
                        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                        .build();
                MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("sid-map", String.class, String.class);
                mapStateDescriptor.enableTimeToLive(mapTtlConfig);
                sidState = getRuntimeContext().getMapState(mapStateDescriptor);
            }

            @Override
            public TrafficPageViewBean map(JSONObject value) throws Exception {

                //取出相关数据
                JSONObject common = value.getJSONObject("common");
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);

                String lastDt = lastVisitDtState.value();
                long uv = 0L;
                if (lastDt == null || !lastDt.equals(curDt)) {
                    lastVisitDtState.update(curDt);
                    uv = 1L;
                }

                long sv = 0L;
                String sid = common.getString("sid");
                if (!sidState.contains(sid)) {
                    sidState.put(sid, "");
                    sv = 1L;
                }

                return new TrafficPageViewBean("", "",
                        common.getString("vc"),
                        common.getString("ch"),
                        common.getString("ar"),
                        common.getString("is_new"),
                        curDt,
                        uv,
                        sv,
                        1L,
                        value.getJSONObject("page").getLong("during_time"));
            }
        });

        //6.分组开窗聚合
        SingleOutputStreamOperator<TrafficPageViewBean> resultDS = trafficPageViewDS.keyBy(new KeySelector<TrafficPageViewBean, Tuple4<String, String, String, String>>() {
                    @Override
                    public Tuple4<String, String, String, String> getKey(TrafficPageViewBean value) throws Exception {
                        return new Tuple4<>(value.getIsNew(),
                                value.getAr(),
                                value.getCh(),
                                value.getVc());
                    }
                }).window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TrafficPageViewBean>() {
                    @Override
                    public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
                        value1.setUvCt(value1.getUvCt() + value2.getUvCt());
                        value1.setPvCt(value1.getPvCt() + value2.getPvCt());
                        value1.setSvCt(value1.getSvCt() + value2.getSvCt());
                        value1.setDurSum(value1.getDurSum() + value2.getDurSum());
                        return value1;
                    }
                }, new WindowFunction<TrafficPageViewBean, TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow>() {
                    @Override
                    public void apply(Tuple4<String, String, String, String> key, TimeWindow window, Iterable<TrafficPageViewBean> input, Collector<TrafficPageViewBean> out) throws Exception {

                        TrafficPageViewBean next = input.iterator().next();

                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                        out.collect(next);
                    }
                });

        //7.将数据写出到Doris
        resultDS.print(">>>>>>>");
        resultDS.map(bean -> {
                    SerializeConfig config = new SerializeConfig();
                    config.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;  // 转成json的时候, 属性名使用下划线
                    return JSON.toJSONString(bean, config);
                })
                .sinkTo(DorisUtil.getDorisSink("dws_traffic_vc_ch_ar_is_new_page_view_window"));

        //8.启动
        env.execute("Dws02_TrafficVcChArIsNewPageViewWindow");

    }

}
